package com.huan.study.redis.pubsub.config;

import com.huan.study.redis.pubsub.listener.ConsumerListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;

import java.util.concurrent.Executors;

/**
 * redis 发布订阅配置
 *
 * @author huan.fu 2021/11/12 - 下午4:32
 */
@Configuration
public class RedisPubSubConfiguration {
    
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        
        /**
         * 基于 channel 订阅，后方的 * 不是匹配，是发布消息的时候,channel必须是 fruit.* 才能匹配上
         */
        container.addMessageListener(new ConsumerListener("A"), new ChannelTopic("fruit.*"));
        /**
         * 基于 pattern 订阅
         * * 表示0个或多个字符
         */
        container.addMessageListener(new ConsumerListener("B"), new PatternTopic("fruit.*"));
        /**
         * 基于 pattern 订阅
         * ? 表示1个占位符
         */
        container.addMessageListener(new ConsumerListener("C"), new PatternTopic("fruit.?"));
        /**
         * 基于 pattern 订阅
         * ?* 表示1个或1个以上的占位符
         */
        container.addMessageListener(new ConsumerListener("D"), new PatternTopic("fruit.?*"));
        
        // 处理错误
        container.setErrorHandler(new PubsubErrorHandler());
        
        // 设置一个线程池，将接收到的消息使用这个线程池处理，即MessageListener里的消息是这个线程池里处理的
        container.setTaskExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
        
        container.setSubscriptionExecutor(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
        
        container.setTopicSerializer(RedisSerializer.string());
        
        return container;
    }
}
